Remove twisted from save/migrate handling.
This needs to use threads, so add thread support for
http server requests.
Signed-off-by: Mike Wray <mike.wray@hp.com>
import types
+
from xen.xend import sxp
from xen.xend import PrettyPrint
from xen.xend.Args import ArgError
import resource
import http
+import httpserver
import defer
def uri_pathlist(p):
def use_sxp(self, req):
- """Determine whether to send an SXP response to a request.
- Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept.
-
- req request
- returns 1 for SXP, 0 otherwise
- """
- ok = 0
- user_agent = req.getHeader('User-Agent')
- accept = req.getHeader('Accept')
- if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0):
- ok = 1
- return ok
-
+ return req.useSxp()
+
def get_op_method(self, op):
"""Get the method for an operation.
For operation 'foo' looks for 'op_foo'.
The method must return a list when req.use_sxp is true
and an HTML string otherwise (or list).
- Methods may also return a Deferred (for incomplete processing).
+ Methods may also return a ThreadRequest (for incomplete processing).
req request
"""
req.write("Operation not implemented: " + op)
return ''
else:
- return self._perform(op, op_method, req)
-
- def _perform(self, op, op_method, req):
- try:
- val = op_method(op, req)
- except Exception, err:
- self._perform_err(err, op, req)
- return ''
-
- if isinstance(val, defer.Deferred):
- val.addCallback(self._perform_cb, op, req, dfr=1)
- val.addErrback(self._perform_err, op, req, dfr=1)
- return server.NOT_DONE_YET
- else:
- self._perform_cb(val, op, req, dfr=0)
- return ''
-
- def _perform_cb(self, val, op, req, dfr=0):
- """Callback to complete the request.
- May be called from a Deferred.
-
- @param err: the error
- @param req: request causing the error
- @param dfr: deferred flag
- """
- if isinstance(val, resource.ErrorPage):
- req.write(val.render(req))
- elif self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(val, out=req)
- else:
- req.write('<html><head></head><body>')
- self.print_path(req)
- if isinstance(val, types.ListType):
- req.write('<code><pre>')
- PrettyPrint.prettyprint(val, out=req)
- req.write('</pre></code>')
- else:
- req.write(str(val))
- req.write('</body></html>')
- if dfr:
- req.finish()
-
- def _perform_err(self, err, op, req, dfr=0):
- """Error callback to complete a request.
- May be called from a Deferred.
-
- @param err: the error
- @param req: request causing the error
- @param dfr: deferred flag
- """
- if not (isinstance(err, ArgError) or
- isinstance(err, sxp.ParseError) or
- isinstance(err, XendError)):
- if dfr:
- return err
- else:
- raise
- #log.exception("op=%s: %s", op, str(err))
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(['xend.err', str(err)], out=req)
- else:
- req.setHeader("Content-Type", "text/plain")
- req.write('Error ')
- req.write(': ')
- req.write(str(err))
- if dfr:
- req.finish()
-
+ return op_method(op, req)
def print_path(self, req):
"""Print the path with hyperlinks.
"""
- pathlist = [x for x in req.prepath if x != '' ]
- s = "/"
- req.write('<h1><a href="/">/</a>')
- for x in pathlist:
- s += x + "/"
- req.write(' <a href="%s">%s</a>/' % (s, x))
- req.write("</h1>")
+ req.printPath()
self.table = {}
self.order = []
- def __repr__(self):
- return "<SrvDir %x %s>" %(id(self), self.table.keys())
-
def noChild(self, msg):
return resource.ErrorPage(http.NOT_FOUND, msg=msg)
return True
def dataReceived(self, data):
+ if not self.connected:
+ return True
if not self.protocol:
return True
try:
except SystemExit:
raise
except Exception, ex:
- self.disconnect(ex)
+ self.loseConnection(ex)
return True
return False
except SystemExit:
raise
except Exception, ex:
- self.disconnect(ex)
+ self.loseConnection(ex)
def mainLoop(self):
# Something a protocol could call.
if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
return False
else:
- self.disconnect(ex)
+ self.loseConnection(ex)
return True
def read(self):
if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
return None
else:
- self.disconnect(ex)
+ self.loseConnection(ex)
return True
def dataReceived(self, data):
except SystemExit:
raise
except Exception, ex:
- self.disconnect(ex)
+ self.loseConnection(ex)
return True
return False
- def disconnect(self, reason=None):
+ def loseConnection(self, reason=None):
self.thread = None
self.closeSocket(reason)
self.closeProtocol(reason)
def __init__(self, factory):
self.factoryStarted = False
+ self.clientLost = False
+ self.clientFailed = False
self.factory = factory
self.state = "disconnected"
self.transport = None
if self.state != "disconnected":
raise socket.error(EINVAL, "cannot connect in state " + self.state)
self.state = "connecting"
+ self.clientLost = False
+ self.clientFailed = False
if not self.factoryStarted:
self.factoryStarted = True
self.factory.doStart()
- self.factory.startedConnecting()
+ self.factory.startedConnecting(self)
self.connectTransport()
+ self.state = "connected"
def stopConnecting(self):
if self.state != "connecting":
return self.factory.buildProtocol(addr)
def connectionLost(self, reason=None):
- self.factory.doStop()
+ if not self.clientLost:
+ self.clientLost = True
+ self.factory.clientConnectionLost(self, reason)
def connectionFailed(self, reason=None):
- self.factory.doStop()
+ if not self.clientFailed:
+ self.clientFailed = True
+ self.factory.clientConnectionFailed(self, reason)
header_count += 1
if line == '\r\n' or line == '\n' or line == '':
break
- #print 'parseRequestHeaders>', header_bytes
header_input = StringIO(header_bytes)
self.request_headers = Message(header_input)
self.content.seek(0,0)
def parseRequest(self):
- #print 'parseRequest>'
self.request_line = self.rin.readline()
self.parseRequestLine()
self.parseRequestHeaders()
self.setCloseConnection(connection_mode)
self.readContent()
self.parseRequestArgs()
- #print 'parseRequest<'
def setCloseConnection(self, mode):
if not mode: return
self.close_connection = True
elif (mode == 'keep-alive') and (self.http_version >= (1, 1)):
self.close_connection = False
- #print 'setCloseConnection>', mode, self.close_connection
+ def getCloseConnection(self):
+ return self.close_connection
+
def getHeader(self, k, v=None):
return self.request_headers.get(k, v)
self.response_status = status
def setResponseHeader(self, k, v):
- #print 'setResponseHeader>', k, v
k = k.lower()
self.response_headers[k] = v
if k == 'connection':
self.send("\r\n")
def sendResponse(self):
- #print 'sendResponse>'
if self.response_sent:
return
self.response_sent = True
self.output.seek(0, 0)
body = self.output.getvalue()
body_length = len(body)
- #print 'sendResponse> body=', body_length, body
self.setResponseHeader("Content-Length", body_length)
if self.http_version > (0, 9):
self.send("%s %d %s\r\n" % (self.http_version_string,
self.response_status))
self.sendResponseHeaders()
if send_body:
- #print 'sendResponse> writing body'
self.send(body)
+ self.flush()
def write(self, data):
- #print 'write>', data
self.output.write(data)
def send(self, data):
- #print 'send>', len(data), '|%s|' % data
+ #print 'send>', data
self.out.write(data)
+ def flush(self):
+ self.out.flush()
+
def hasNoBody(self):
return ((self.request_method == "HEAD") or
(self.response_code in NO_BODY_CODES) or
+import threading
+
import string
import socket
+import types
from urllib import quote, unquote
+from xen.xend import sxp
+from xen.xend.Args import ArgError
+from xen.xend.XendError import XendError
+
import http
+from resource import Resource, ErrorPage
from SrvDir import SrvDir
-class HttpServerRequest(http.HttpRequest):
+class ThreadRequest:
+ """A request to complete processing using a thread.
+ """
+
+ def __init__(self, processor, req, fn, args, kwds):
+ self.processor = processor
+ self.req = req
+ self.fn = fn
+ self.args = args
+ self.kwds = kwds
+
+ def run(self):
+ self.processor.setInThread()
+ thread = threading.Thread(target=self.main)
+ thread.setDaemon(True)
+ thread.start()
- def __init__(self, server, addr, srd, srw):
- #print 'HttpServerRequest>', addr
+ def call(self):
+ try:
+ self.fn(*self.args, **self.kwds)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.req.resultErr(ex)
+ self.req.finish()
+
+ def main(self):
+ self.call()
+ self.processor.process()
+
+
+class RequestProcessor:
+ """Processor for requests on a connection to an http server.
+ Requests are executed synchonously unless they ask for a thread by returning
+ a ThreadRequest.
+ """
+
+ done = False
+
+ inThread = False
+
+ def __init__(self, server, sock, addr):
self.server = server
+ self.sock = sock
+ self.srd = sock.makefile('rb')
+ self.srw = sock.makefile('wb')
+ self.srvaddr = server.getServerAddr()
+
+ def isInThread(self):
+ return self.inThread
+
+ def setInThread(self):
+ self.inThread = True
+
+ def getServer(self):
+ return self.server
+
+ def getRequest(self):
+ return HttpServerRequest(self, self.srvaddr, self.srd, self.srw)
+
+ def close(self):
+ try:
+ self.sock.close()
+ except:
+ pass
+
+ def finish(self):
+ self.done = True
+ self.close()
+
+ def process(self):
+ while not self.done:
+ req = self.getRequest()
+ res = req.process()
+ if isinstance(res, ThreadRequest):
+ if self.isInThread():
+ res.call()
+ else:
+ res.run()
+ break
+ else:
+ req.finish()
+
+class HttpServerRequest(http.HttpRequest):
+ """A single request to an http server.
+ """
+
+ def __init__(self, processor, addr, srd, srw):
+ self.processor = processor
self.prepath = ''
http.HttpRequest.__init__(self, addr, srd, srw)
+ def getServer(self):
+ return self.processor.getServer()
+
def process(self):
- #print 'HttpServerRequest>process', 'path=', self.request_path
- self.prepath = []
- self.postpath = map(unquote, string.split(self.request_path[1:], '/'))
- res = self.getResource()
- self.render(res)
+ """Process the request. If the return value is a ThreadRequest
+ it is evaluated in a thread.
+ """
+ try:
+ self.prepath = []
+ self.postpath = map(unquote, string.split(self.request_path[1:], '/'))
+ resource = self.getResource()
+ return self.render(resource)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.processError(ex)
+
+ def processError(self, ex):
+ import traceback; traceback.print_exc()
+ self.sendError(http.INTERNAL_SERVER_ERROR, msg=str(ex))
+ self.setCloseConnection('close')
+
+ def finish(self):
self.sendResponse()
- return self.close_connection
-
+ if self.close_connection:
+ self.processor.finish()
+
def prePathURL(self):
url_host = self.getRequestHostname()
port = self.getPort()
return ('%s://%s/%s' % (url_proto, url_host, url_path))
def getResource(self):
- return self.server.getResource(self)
+ return self.getServer().getResource(self)
- def render(self, res):
- #print 'HttpServerRequest>render', res
- if res is None:
+ def render(self, resource):
+ val = None
+ if resource is None:
self.sendError(http.NOT_FOUND)
else:
- res.render(self)
+ try:
+ while True:
+ val = resource.render(self)
+ if not isinstance(val, Resource):
+ break
+ val = self.result(val)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.resultErr(ex)
+ return val
+
+ def threadRequest(self, _fn, *_args, **_kwds):
+ """Create a request to finish request processing in a thread.
+ Use this to create a ThreadRequest to return from rendering a
+ resource if you need a thread to complete processing.
+ """
+ return ThreadRequest(self.processor, self, _fn, _args, _kwds)
+
+ def result(self, val):
+ if isinstance(val, Exception):
+ return self.resultErr(val)
+ else:
+ return self.resultVal(val)
+
+ def resultVal(self, val):
+ """Callback to complete the request.
+
+ @param val: the value
+ """
+ if isinstance(val, ThreadRequest):
+ return val
+ elif self.useSxp():
+ self.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(val, out=self)
+ else:
+ self.write('<html><head></head><body>')
+ self.printPath()
+ if isinstance(val, types.ListType):
+ self.write('<code><pre>')
+ PrettyPrint.prettyprint(val, out=self)
+ self.write('</pre></code>')
+ else:
+ self.write(str(val))
+ self.write('</body></html>')
+ return None
+
+ def resultErr(self, err):
+ """Error callback to complete a request.
+
+ @param err: the error
+ """
+ if not isinstance(err, (ArgError, sxp.ParseError, XendError)):
+ raise
+ #log.exception("op=%s: %s", op, str(err))
+ if self.useSxp():
+ self.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(['xend.err', str(err)], out=self)
+ else:
+ self.setHeader("Content-Type", "text/plain")
+ self.write('Error ')
+ self.write(': ')
+ self.write(str(err))
+ return None
+
+ def useSxp(self):
+ """Determine whether to send an SXP response to a request.
+ Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept.
+ returns 1 for SXP, 0 otherwise
+ """
+ ok = 0
+ user_agent = self.getHeader('User-Agent')
+ accept = self.getHeader('Accept')
+ if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0):
+ ok = 1
+ return ok
+
+ def printPath(self):
+ pathlist = [x for x in self.prepath if x != '' ]
+ s = "/"
+ self.write('<h1><a href="/">/</a>')
+ for x in pathlist:
+ s += x + "/"
+ self.write(' <a href="%s">%s</a>/' % (s, x))
+ self.write("</h1>")
+
class HttpServer:
- request_queue_size = 5
+ backlog = 5
+
+ closed = False
def __init__(self, interface='', port=8080, root=None):
if root is None:
root = SrvDir()
self.interface = interface
self.port = port
- self.closed = False
self.root = root
def getRoot(self):
self.close()
def bind(self):
- #print 'bind>', self.interface, self.port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.interface, self.port))
def listen(self):
- self.socket.listen(self.request_queue_size)
+ self.socket.listen(self.backlog)
def accept(self):
return self.socket.accept()
pass
def acceptRequest(self):
- #print 'acceptRequest>'
try:
(sock, addr) = self.accept()
- #print 'acceptRequest>', sock, addr
self.processRequest(sock, addr)
except socket.error:
return
def processRequest(self, sock, addr):
- #print 'processRequest>', sock, addr
- srd = sock.makefile('rb')
- srw = sock.makefile('wb')
- srvaddr = (socket.gethostname(), self.port)
- while True:
- #print 'HttpServerRequest...'
- req = HttpServerRequest(self, srvaddr, srd, srw)
- close = req.process()
- srw.flush()
- #print 'HttpServerRequest close=', close
- if close:
- break
try:
- #print 'close...'
- sock.close()
- except:
- pass
- #print 'processRequest<', sock, addr
+ rp = RequestProcessor(self, sock, addr)
+ rp.process()
+ except SystemExit:
+ raise
+ except Exception, ex:
+ print 'HttpServer>processRequest> exception: ', ex
+ try:
+ sock.close()
+ except:
+ pass
+
+ def getServerAddr(self):
+ return (socket.gethostname(), self.port)
def getResource(self, req):
return self.root.getRequestResource(req)
class Factory:
+ """Generic protocol factory.
+ """
- def __init__(self):
- pass
+ starts = 0
- def startedConnecting(self):
- print 'ServerProtocolFactory>startedConnecting>'
+ def __init__(self):
pass
def doStart(self):
- print 'ServerProtocolFactory>doStart>'
- pass
+ if self.starts == 0:
+ self.startFactory()
+ self.starts += 1
def doStop(self):
- print 'ServerProtocolFactory>doStop>'
- pass
+ if self.starts > 0:
+ self.starts -= 1
+ else:
+ return
+ if self.starts == 0:
+ self.stopFactory()
def buildProtocol(self, addr):
- print 'ServerProtocolFactory>buildProtocol>', addr
return Protocol(self)
+ def startFactory(self):
+ pass
+
+ def stopFactory(self):
+ pass
+
class ServerFactory(Factory):
+ """Factory for server protocols.
+ """
pass
class ClientFactory(Factory):
- pass
+ """Factory for client protocols.
+ """
+
+ def startedConnecting(self, connector):
+ pass
+
+ def clientConnectionLost(self, connector, reason):
+ pass
+
+ def clientConnectionFailed(self, connector, reason):
+ pass
+
class Protocol:
else:
return None
-class TestClientFactory(Factory):
+class TestClientFactory(ClientFactory):
def buildProtocol(self, addr):
- print 'TestClientProtocolFactory>buildProtocol>', addr
+ print 'TestClientFactory>buildProtocol>', addr
return TestClientProtocol(self)
+ def startedConnecting(self, connector):
+ print 'TestClientFactory>startedConnecting>', connector
+
+ def clientConnectionLost(self, connector, reason):
+ print 'TestClientFactory>clientConnectionLost>', connector, reason
+
+ def clientConnectionFailed(self, connector, reason):
+ print 'TestClientFactory>clientConnectionFailed>', connector, reason
+
class TestClientProtocol(Protocol):
def connectionMade(self, addr):
- print 'TestProtocol>connectionMade>', addr
+ print 'TestClientProtocol>connectionMade>', addr
self.write("hello")
self.write("there")
class TestServerFactory(Factory):
def buildProtocol(self, addr):
- print 'TestServerProtocolFactory>buildProtocol>', addr
+ print 'TestServerFactory>buildProtocol>', addr
return TestServerProtocol(self)
class TestServerProtocol(Protocol):
@param id: domain id
"""
- dominfo = xen_domain(id)
+ dominfo = self.xen_domain(id)
if dominfo:
d = self.domain_by_id.get(id)
if d:
@param src: source file
@param progress: output progress if true
- @return: deferred
"""
xmigrate = XendMigrate.instance()
return xmigrate.restore_begin(src)
"""Start domain migration.
@param id: domain id
- @return: deferred
"""
# Need a cancel too?
# Don't forget to cancel restart for it.
@param id: domain id
@param dst: destination file
@param progress: output progress if true
- @return: deferred
"""
dominfo = self.domain_lookup(id)
xmigrate = XendMigrate.instance()
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
import traceback
+import threading
import errno
import sys
import time
import types
-from twisted.internet import reactor
-from twisted.internet import defer
-#defer.Deferred.debug = 1
-from twisted.internet.protocol import Protocol
-from twisted.internet.protocol import ClientFactory
-from twisted.python.failure import Failure
+from xen.web import reactor
+from xen.web.protocol import Protocol, ClientFactory
import sxp
import XendDB
self.parser = sxp.Parser()
self.xinfo = xinfo
- def connectionMade(self):
+ def connectionMade(self, addr=None):
# Send hello.
self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
# Send request.
self.xinfo.request(self)
+ # Run the transport mainLoop which reads from the peer.
+ self.transport.mainLoop()
def request(self, req):
sxp.show(req, out=self.transport)
if self.parser.at_eof():
self.loseConnection()
-
class XfrdClientFactory(ClientFactory):
"""Factory for clients of the migration/save daemon xfrd.
"""
def __init__(self, xinfo):
#ClientFactory.__init__(self)
self.xinfo = xinfo
+ self.readyCond = threading.Condition()
+ self.ready = False
+ self.err = None
+ def start(self):
+ print 'XfrdClientFactory>start>'
+ reactor.connectTCP('localhost', XFRD_PORT, self)
+ try:
+ self.readyCond.acquire()
+ while not self.ready:
+ self.readyCond.wait()
+ finally:
+ self.readyCond.release()
+ print 'XfrdClientFactory>start>', 'err=', self.err
+ if self.err:
+ raise self.err
+ return 0
+
+ def notifyReady(self):
+ try:
+ self.readyCond.acquire()
+ self.ready = True
+ self.err = self.xinfo.error_summary()
+ self.readyCond.notify()
+ finally:
+ self.readyCond.release()
+
def startedConnecting(self, connector):
pass
return Xfrd(self.xinfo)
def clientConnectionLost(self, connector, reason):
- pass
+ print "XfrdClientFactory>clientConnectionLost>", reason
+ self.notifyReady()
def clientConnectionFailed(self, connector, reason):
+ print "XfrdClientFactory>clientConnectionFailed>", reason
self.xinfo.error(reason)
+ self.notifyReady()
+
+class SuspendHandler:
+
+ def __init__(self, xinfo, vmid, timeout):
+ self.xinfo = xinfo
+ self.vmid = vmid
+ self.timeout = timeout
+ self.readyCond = threading.Condition()
+ self.ready = False
+ self.err = None
+
+ def start(self):
+ self.subscribe(on=True)
+ timer = reactor.callLater(self.timeout, self.onTimeout)
+ try:
+ self.readyCond.acquire()
+ while not self.ready:
+ self.readyCond.wait()
+ finally:
+ self.readyCond.release()
+ self.subscribe(on=False)
+ timer.cancel()
+ if self.err:
+ raise XendError(self.err)
+
+ def notifyReady(self, err=None):
+ try:
+ self.readyCond.acquire()
+ if not self.ready:
+ self.ready = True
+ self.err = err
+ self.readyCond.notify()
+ finally:
+ self.readyCond.release()
+
+ def subscribe(self, on=True):
+ # Subscribe to 'suspended' events so we can tell when the
+ # suspend completes. Subscribe to 'died' events so we can tell if
+ # the domain died.
+ if on:
+ action = eserver.subscribe
+ else:
+ action = eserver.unsubscribe
+ action('xend.domain.suspended', self.onSuspended)
+ action('xend.domain.died', self.onDied)
+
+ def onSuspended(self, e, v):
+ if v[1] != self.vmid: return
+ print 'SuspendHandler>onSuspended>', e, v
+ self.notifyReady()
+
+ def onDied(self, e, v):
+ if v[1] != self.vmid: return
+ print 'SuspendHandler>onDied>', e, v
+ self.notifyReady('Domain %s died while suspending' % self.vmid)
+
+ def onTimeout(self):
+ print 'SuspendHandler>onTimeout>'
+ self.notifyReady('Domain %s suspend timed out' % self.vmid)
class XfrdInfo:
"""Abstract class for info about a session with xfrd.
"""Suspend timeout (seconds).
We set a timeout because suspending a domain can hang."""
- timeout = 10
+ timeout = 30
def __init__(self):
from xen.xend import XendDomain
self.xd = XendDomain.instance()
- self.deferred = defer.Deferred()
self.suspended = {}
self.paused = {}
self.state = 'init'
# List of errors encountered.
self.errors = []
-
+
def vmconfig(self):
dominfo = self.xd.domain_get(self.src_dom)
if dominfo:
def add_error(self, err):
"""Add an error to the error list.
- Returns the error added (which may have been unwrapped if it
- was a Twisted Failure).
+ Returns the error added.
"""
- while isinstance(err, Failure):
- err = err.value
+ #while isinstance(err, Failure):
+ # err = err.value
if err not in self.errors:
self.errors.append(err)
return err
def error_summary(self, msg=None):
"""Get a XendError summarising the errors (if any).
"""
+ if not self.errors:
+ return None
if msg is None:
msg = "errors"
if self.errors:
return self.errors
def error(self, err):
+ print 'XfrdInfo>error>', err
self.state = 'error'
self.add_error(err)
- if not self.deferred.called:
- self.deferred.errback(self.error_summary())
def dispatch(self, xfrd, val):
-
- def cbok(v):
- if v is None: return
- sxp.show(v, out=xfrd.transport)
-
- def cberr(err):
- v = ['xfr.err', errno.EINVAL]
- sxp.show(v, out=xfrd.transport)
- self.error(err)
-
+ print 'XfrdInfo>dispatch>', val
op = sxp.name(val)
op = op.replace('.', '_')
if op.startswith('xfr_'):
fn = getattr(self, op, self.unknown)
else:
fn = self.unknown
- val = fn(xfrd, val)
- if isinstance(val, defer.Deferred):
- val.addCallback(cbok)
- val.addErrback(cberr)
- else:
- cbok(val)
+ try:
+ val = fn(xfrd, val)
+ if val:
+ sxp.show(val, out=xfrd.transport)
+ except Exception, err:
+ print 'XfrdInfo>dispatch> error:', err
+ val = ['xfr.err', errno.EINVAL]
+ sxp.show(val, out=xfrd.transport)
+ self.error(err)
def unknown(self, xfrd, val):
xfrd.loseConnection()
def xfr_err(self, xfrd, val):
# If we get an error with non-zero code the operation failed.
# An error with code zero indicates hello success.
+ print 'XfrdInfo>xfr_err>', val
v = sxp.child0(val)
err = int(sxp.child0(val))
if not err: return
return ['xfr.err', val]
def xfr_vm_suspend(self, xfrd, val):
- """Suspend a domain. Suspending takes time, so we return
- a Deferred that is called when the suspend completes.
+ """Suspend a domain.
Suspending can hang, so we set a timeout and fail if it
takes too long.
"""
try:
vmid = sxp.child0(val)
- d = defer.Deferred()
- # Subscribe to 'suspended' events so we can tell when the
- # suspend completes. Subscribe to 'died' events so we can tell if
- # the domain died. Set a timeout and error handler so the subscriptions
- # will be cleaned up if suspending hangs or there is an error.
- def onSuspended(e, v):
- if v[1] != vmid: return
- subscribe(on=0)
- if not d.called:
- d.callback(v)
-
- def onDied(e, v):
- if v[1] != vmid: return
- if not d.called:
- d.errback(XendError('Domain %s died while suspending' % vmid))
-
- def subscribe(on=1):
- if on:
- action = eserver.subscribe
- else:
- action = eserver.unsubscribe
- action('xend.domain.suspended', onSuspended)
- action('xend.domain.died', onDied)
-
- def cberr(err):
- subscribe(on=0)
- self.add_error("suspend failed")
- self.add_error(err)
- return err
-
- d.addErrback(cberr)
- d.setTimeout(self.timeout)
- subscribe()
+ h = SuspendHandler(self, vmid, self.timeout)
val = self.xd.domain_shutdown(vmid, reason='suspend')
self.suspended[vmid] = 1
- return d
+ h.start()
+ print 'xfr_vm_suspend> suspended', vmid
except Exception, err:
+ print 'xfr_vm_suspend> err', err
self.add_error("suspend failed")
self.add_error(err)
traceback.print_exc()
return ['xfr.err', val]
def connectionLost(self, reason=None):
+ print 'XfrdInfo>connectionLost>', reason
for vmid in self.suspended:
try:
self.xd.domain_destroy(vmid)
self.state = 'ok'
self.dst_dom = dom
self.xd.domain_destroy(self.src_dom)
- if not self.deferred.called:
- self.deferred.callback(self)
+ #if not self.deferred.called:
+ # self.deferred.callback(self)
def connectionLost(self, reason=None):
+ print 'XendMigrateInfo>connectionLost>', reason
XfrdInfo.connectionLost(self, reason)
if self.state =='ok':
log.info('Migrate OK: ' + str(self.sxpr()))
def xfr_save_ok(self, xfrd, val):
self.state = 'ok'
self.xd.domain_destroy(self.src_dom)
- if not self.deferred.called:
- self.deferred.callback(self)
+ #if not self.deferred.called:
+ # self.deferred.callback(self)
def connectionLost(self, reason=None):
+ print 'XendSaveInfo>connectionLost>', reason
XfrdInfo.connectionLost(self, reason)
if self.state =='ok':
log.info('Save OK: ' + str(self.sxpr()))
def session_begin(self, info):
"""Add the session to the table and start it.
- Set up callbacks to remove the session from the table
- when it finishes.
+ Remove the session from the table when it finishes.
@param info: session
@return: deferred
"""
- dfr = defer.Deferred()
- def cbok(val):
- self._delete_session(info.xid)
- if not dfr.called:
- dfr.callback(val)
- return val
- def cberr(err):
- self._delete_session(info.xid)
- if not dfr.called:
- dfr.errback(err)
- return err
self._add_session(info)
- info.deferred.addCallback(cbok)
- info.deferred.addErrback(cberr)
- xcf = XfrdClientFactory(info)
- reactor.connectTCP('localhost', XFRD_PORT, xcf)
- return dfr
+ try:
+ xcf = XfrdClientFactory(info)
+ return xcf.start()
+ finally:
+ self._delete_session(info.xid)
def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0):
"""Begin to migrate a domain to another host.
return self.perform(req)
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(self.info.sxpr(), out=req)
- else:
- req.write('<html><head></head><body>')
- self.print_path(req)
- #self.ls()
- req.write('<p>%s</p>' % self.info)
- req.write('<p><a href="%s">Connect to domain %d</a></p>'
- % (self.info.uri(), self.info.dom))
- self.form(req)
- req.write('</body></html>')
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(self.info.sxpr(), out=req)
+ else:
+ req.write('<html><head></head><body>')
+ self.print_path(req)
+ #self.ls()
+ req.write('<p>%s</p>' % self.info)
+ req.write('<p><a href="%s">Connect to domain %d</a></p>'
+ % (self.info.uri(), self.info.dom))
+ self.form(req)
+ req.write('</body></html>')
def form(self, req):
req.write('<form method="post" action="%s">' % req.prePathURL())
return v
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- self.ls_console(req, 1)
- else:
- req.write("<html><head></head><body>")
- self.print_path(req)
- self.ls(req)
- self.ls_console(req)
- #self.form(req.wfile)
- req.write("</body></html>")
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ self.ls_console(req, 1)
+ else:
+ req.write("<html><head></head><body>")
+ self.print_path(req)
+ self.ls(req)
+ self.ls_console(req)
+ #self.form(req.wfile)
+ req.write("</body></html>")
def ls_console(self, req, use_sxp=0):
url = req.prePathURL()
import traceback
import time
-#from twisted.internet import pollreactor; pollreactor.install()
-from twisted.internet import reactor
-
from xen.lowlevel import xu
from xen.xend import sxp
self.daemonize()
print 'running serverthread...'
serverthread.start()
- print 'running reactor...'
- reactor.run()
except Exception, ex:
print >>sys.stderr, 'Exception starting xend:', ex
if DEBUG:
self.channelF.start()
def exit(self, rc=0):
- reactor.disconnectAll()
+ #reactor.disconnectAll()
self.channelF.stop()
# Calling sys.exit() raises a SystemExit exception, which only
# kills the current thread. Calling os._exit() makes the whole
self.perform(req)
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", "text/plain")
- req.write(self.info())
- else:
- req.write('<html><head></head><body>')
- self.print_path(req)
- req.write('<pre>')
- req.write(self.info())
- req.write('</pre></body></html>')
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", "text/plain")
+ req.write(self.info())
+ else:
+ req.write('<html><head></head><body>')
+ self.print_path(req)
+ req.write('<pre>')
+ req.write(self.info())
+ req.write('</pre></body></html>')
def info(self):
return self.xd.info()
not a domain name.
"""
fn = FormFn(self.xd.domain_configure,
- [['dom', 'int'],
+ [['dom', 'int'],
['config', 'sxpr']])
- deferred = fn(req.args, {'dom': self.dom.dom})
- return deferred
+ return fn(req.args, {'dom': self.dom.dom})
def op_unpause(self, op, req):
val = self.xd.domain_unpause(self.dom.name)
return val
def op_pause(self, op, req):
+ # Pause doesn't need a thread, but request one for testing.
+ return req.threadRequest(self.do_pause, op, req)
+
+ def do_pause(self, op, req):
val = self.xd.domain_pause(self.dom.name)
return val
def op_shutdown(self, op, req):
fn = FormFn(self.xd.domain_shutdown,
- [['dom', 'str'],
+ [['dom', 'str'],
['reason', 'str'],
- ['key', 'int']])
+ ['key', 'int']])
val = fn(req.args, {'dom': self.dom.id})
req.setResponseCode(http.ACCEPTED)
req.setHeader("Location", "%s/.." % req.prePathURL())
def op_destroy(self, op, req):
fn = FormFn(self.xd.domain_destroy,
- [['dom', 'str'],
+ [['dom', 'str'],
['reason', 'str']])
val = fn(req.args, {'dom': self.dom.id})
req.setHeader("Location", "%s/.." % req.prePathURL())
return val
def op_save(self, op, req):
+ return req.threadRequest(self.do_save, op, req)
+
+ def do_save(self, op, req):
fn = FormFn(self.xd.domain_save,
- [['dom', 'str'],
+ [['dom', 'str'],
['file', 'str']])
- deferred = fn(req.args, {'dom': self.dom.id})
- deferred.addCallback(self._op_save_cb, req)
- return deferred
-
- def _op_save_cb(self, val, req):
+ val = fn(req.args, {'dom': self.dom.id})
return 0
def op_migrate(self, op, req):
+ return req.threadRequest(self.do_migrate, op, req)
+
+ def do_migrate(self, op, req):
fn = FormFn(self.xd.domain_migrate,
- [['dom', 'str'],
+ [['dom', 'str'],
['destination', 'str'],
- ['live', 'int'],
- ['resource', 'int']])
- deferred = fn(req.args, {'dom': self.dom.id})
- deferred.addCallback(self._op_migrate_cb, req)
- return deferred
-
- def _op_migrate_cb(self, info, req):
- print '_op_migrate_cb>', info, req
+ ['live', 'int'],
+ ['resource', 'int']])
+ info = fn(req.args, {'dom': self.dom.id})
#req.setResponseCode(http.ACCEPTED)
host = info.dst_host
port = info.dst_port
dom = info.dst_dom
url = "http://%s:%d/xend/domain/%d" % (host, port, dom)
req.setHeader("Location", url)
- print '_op_migrate_cb> url=', url
+ print 'do_migrate> url=', url
return url
def op_pincpu(self, op, req):
def op_cpu_bvt_set(self, op, req):
fn = FormFn(self.xd.domain_cpu_bvt_set,
- [['dom', 'str'],
- ['mcuadv', 'int'],
- ['warpback', 'int'],
+ [['dom', 'str'],
+ ['mcuadv', 'int'],
+ ['warpback', 'int'],
['warpvalue', 'int'],
- ['warpl', 'long'],
- ['warpu', 'long']])
+ ['warpl', 'long'],
+ ['warpu', 'long']])
val = fn(req.args, {'dom': self.dom.id})
return val
def op_maxmem_set(self, op, req):
fn = FormFn(self.xd.domain_maxmem_set,
- [['dom', 'str'],
+ [['dom', 'str'],
['memory', 'int']])
val = fn(req.args, {'dom': self.dom.id})
return val
def op_device_create(self, op, req):
fn = FormFn(self.xd.domain_device_create,
- [['dom', 'str'],
+ [['dom', 'str'],
['config', 'sxpr']])
- d = fn(req.args, {'dom': self.dom.id})
- return d
+ val = fn(req.args, {'dom': self.dom.id})
+ return val
def op_device_refresh(self, op, req):
fn = FormFn(self.xd.domain_device_refresh,
- [['dom', 'str'],
+ [['dom', 'str'],
['type', 'str'],
- ['idx', 'str']])
+ ['idx', 'str']])
val = fn(req.args, {'dom': self.dom.id})
return val
def op_device_destroy(self, op, req):
fn = FormFn(self.xd.domain_device_destroy,
- [['dom', 'str'],
+ [['dom', 'str'],
['type', 'str'],
- ['idx', 'str']])
+ ['idx', 'str']])
val = fn(req.args, {'dom': self.dom.id})
return val
def op_device_configure(self, op, req):
fn = FormFn(self.xd.domain_device_configure,
- [['dom', 'str'],
+ [['dom', 'str'],
['config', 'sxpr'],
- ['idx', 'str']])
- d = fn(req.args, {'dom': self.dom.id})
- return d
+ ['idx', 'str']])
+ val = fn(req.args, {'dom': self.dom.id})
+ return val
def op_vif_credit_limit(self, op, req):
fn = FormFn(self.xd.domain_vif_credit_limit,
- [['dom', 'str'],
- ['vif', 'int'],
+ [['dom', 'str'],
+ ['vif', 'int'],
['credit', 'int'],
['period', 'int']])
val = fn(req.args, {'dom': self.dom.id})
def op_mem_target_set(self, op, req):
fn = FormFn(self.xd.domain_mem_target_set,
- [['dom', 'str'],
+ [['dom', 'str'],
['target', 'int']])
val = fn(req.args, {'dom': self.dom.id})
return val
raise XendError("Error creating domain: " + str(ex))
def _op_create_cb(self, dominfo, configstring, req):
- """Callback to handle deferred domain creation.
+ """Callback to handle domain creation.
"""
dom = dominfo.name
domurl = "%s/%s" % (req.prePathURL(), dom)
def op_restore(self, op, req):
"""Restore a domain from file.
- @return: deferred
"""
+ return req.threadRequest(self.do_restore, op, req)
+
+ def do_restore(self, op, req):
fn = FormFn(self.xd.domain_restore,
[['file', 'str']])
dominfo = fn(req.args)
- return self._op_restore_cb(dominfo, req)
-
- def _op_restore_cb(self, dominfo, req):
dom = dominfo.name
domurl = "%s/%s" % (req.prePathURL(), dom)
req.setResponseCode(http.CREATED)
return self.perform(req)
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- self.ls_domain(req, 1)
- else:
- req.write("<html><head></head><body>")
- self.print_path(req)
- self.ls(req)
- self.ls_domain(req)
- self.form(req)
- req.write("</body></html>")
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ self.ls_domain(req, 1)
+ else:
+ req.write("<html><head></head><body>")
+ self.print_path(req)
+ self.ls(req)
+ self.ls_domain(req)
+ self.form(req)
+ req.write("</body></html>")
def ls_domain(self, req, use_sxp=0):
url = req.prePathURL()
return self.perform(req)
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(['node'] + self.info(), out=req)
- else:
- url = req.prePathURL()
- if not url.endswith('/'):
- url += '/'
- req.write('<html><head></head><body>')
- self.print_path(req)
- req.write('<ul>')
- for d in self.info():
- req.write('<li> %10s: %s' % (d[0], str(d[1])))
- req.write('<li><a href="%sdmesg">Xen dmesg output</a>' % url)
- req.write('<li><a href="%slog>Xend log</a>' % url)
- req.write('</ul>')
- req.write('</body></html>')
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(['node'] + self.info(), out=req)
+ else:
+ url = req.prePathURL()
+ if not url.endswith('/'):
+ url += '/'
+ req.write('<html><head></head><body>')
+ self.print_path(req)
+ req.write('<ul>')
+ for d in self.info():
+ req.write('<li> %10s: %s' % (d[0], str(d[1])))
+ req.write('<li><a href="%sdmesg">Xen dmesg output</a>' % url)
+ req.write('<li><a href="%slog>Xend log</a>' % url)
+ req.write('</ul>')
+ req.write('</body></html>')
def info(self):
return self.xn.info()
return self.perform(req)
def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- self.ls_vnet(req, 1)
- else:
- req.write("<html><head></head><body>")
- self.print_path(req)
- self.ls(req)
- self.ls_vnet(req)
- self.form(req)
- req.write("</body></html>")
- return ''
- except Exception, ex:
- self._perform_err(ex, req)
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ self.ls_vnet(req, 1)
+ else:
+ req.write("<html><head></head><body>")
+ self.print_path(req)
+ self.ls(req)
+ self.ls_vnet(req)
+ self.form(req)
+ req.write("</body></html>")
def ls_vnet(self, req, use_sxp=0):
url = req.prePathURL()
self.logfile.encoding = None
def render_GET(self, req):
- try:
- return self.logfile.render(req)
- except Exception, ex:
- self._perform_err(ex, 'log', req)
+ return self.logfile.render(req)
"""Connect the controller to the usbif control interface.
@param recreate: true if after xend restart
- @return: deferred
"""
log.debug("Connecting usbif %s", str(self))
if recreate or self.connected or self.connecting: